package com.atuguigu.flink.Day08;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

//追加流
public class Example3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<SendsorReading> stream = env.addSource(new SensorSource());

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        //TODO 1  table api
        Table table = tEnv.fromDataStream(stream, $("id"), $("temperture").as("temp"), $("timestamp").as("ts"));
        Table tableResult = table.select($("id"), $("temp"));

        tEnv.toAppendStream(tableResult, Row.class).print();



        //TODO 2 SQL
        tEnv.createTemporaryView("sensor",stream,$("id"),$("temperture").as("temp"),$("timestamp").as("ts"));
        Table sqlQuery = tEnv.sqlQuery("SELECT id,temp FROM sensor");
        tEnv.toAppendStream(sqlQuery,Row.class).print();



        env.execute();
    }
}
